{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "c21deb80-9cf7-4185-8205-a38110152d2c",
   "metadata": {},
   "source": [
    "# Kafka\n",
    "\n",
    "[Kafka](https://github.com/apache/kafka) is a distributed messaging system that is used to publish and subscribe to streams of records. \n",
    "This demo shows how to use `KafkaChatMessageHistory` to store and retrieve chat messages from a Kafka cluster."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c7c4fc02-18ac-4285-b8d6-507357e2aa13",
   "metadata": {},
   "source": [
    "A running Kafka cluster is required to run the demo. You can follow this [instruction](https://developer.confluent.io/get-started/python) to create a Kafka cluster locally."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f09f3b45-c4ff-4e59-bf79-238cc85d6465",
   "metadata": {},
   "outputs": [],
   "source": [
    "from langchain_community.chat_message_histories import KafkaChatMessageHistory\n",
    "\n",
    "chat_session_id = \"chat-message-history-kafka\"\n",
    "bootstrap_servers = \"localhost:64797\"  # host:port. `localhost:Plaintext Ports` if setup Kafka cluster locally\n",
    "history = KafkaChatMessageHistory(\n",
    "    chat_session_id,\n",
    "    bootstrap_servers,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "109812d2-85c5-4a65-a8a0-2d16eb80347b",
   "metadata": {},
   "source": [
    "Optional parameters to construct `KafkaChatMessageHistory`:\n",
    " - `ttl_ms`: Time to live in milliseconds for the chat messages.\n",
    " - `partition`: Number of partition of the topic to store the chat messages.\n",
    " - `replication_factor`: Replication factor of the topic to store the chat messages."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c8fba39f-650b-4192-94ea-1a2a89f5348d",
   "metadata": {},
   "source": [
    "`KafkaChatMessageHistory` internally uses Kafka consumer to read chat messages, and it has the ability to mark the consumed position persistently. It has following methods to retrieve chat messages:\n",
    "- `messages`: continue consuming chat messages from last one.\n",
    "- `messages_from_beginning`: reset the consumer to the beginning of the history and consume messages. Optional parameters:\n",
    "    1. `max_message_count`: maximum number of messages to read.\n",
    "    2. `max_time_sec`: maximum time in seconds to read messages.\n",
    "- `messages_from_latest`: reset the consumer to the end of the chat history and try consuming messages. Optional parameters same as above.\n",
    "- `messages_from_last_consumed`: return messages continuing from the last consumed message, similar to `messages`, but with optional parameters.\n",
    "\n",
    "`max_message_count` and `max_time_sec` are used to avoid blocking indefinitely when retrieving messages.\n",
    "As a result, `messages` and other methods to retrieve messages may not return all messages in the chat history. You will need to specify `max_message_count` and `max_time_sec` to retrieve all chat history in a single batch.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "caf2176b-db7a-451a-a292-3d9fde585ded",
   "metadata": {},
   "source": [
    "Add messages and retrieve."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "7e52a70d-3921-4614-b8cd-53b8d3c2deb4",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[HumanMessage(content='hi!'), AIMessage(content='whats up?')]"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "history.add_user_message(\"hi!\")\n",
    "history.add_ai_message(\"whats up?\")\n",
    "\n",
    "history.messages"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "874ce388-da8f-4796-b9ca-3ac114195b10",
   "metadata": {},
   "source": [
    "Calling `messages` again returns an empty list because the consumer is at the end of the chat history."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "f863618e-7da1-4f46-9182-7a1387b93b16",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "history.messages"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e108255b-c240-44f7-9ecc-52bf04cd15b6",
   "metadata": {},
   "source": [
    "Add new messages and continue consuming."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "31aa7403-5392-4ad4-ba43-226020a274e3",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[HumanMessage(content='hi again!'), AIMessage(content='whats up again?')]"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "history.add_user_message(\"hi again!\")\n",
    "history.add_ai_message(\"whats up again?\")\n",
    "history.messages"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5062fabc-c605-40dd-933b-c68de2727874",
   "metadata": {},
   "source": [
    "To reset the consumer and read from beginning:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "005816ae-c8ed-4e41-9ecd-b6432578c8f1",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[HumanMessage(content='hi again!'),\n",
       " AIMessage(content='whats up again?'),\n",
       " HumanMessage(content='hi!'),\n",
       " AIMessage(content='whats up?')]"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "history.messages_from_beginning()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "42cc7bed-5cd7-417f-94fd-fe2e511cc9c6",
   "metadata": {},
   "source": [
    "Set the consumer to the end of the chat history, add a couple of new messages, and consume:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "d8b4f1cd-fa47-461b-b1b6-278ad54e9ac5",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[HumanMessage(content='HI!'), AIMessage(content='WHATS UP?')]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "history.messages_from_latest()\n",
    "history.add_user_message(\"HI!\")\n",
    "history.add_ai_message(\"WHATS UP?\")\n",
    "history.messages"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.18"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
